{
  "metadata" : {
    "name" : "Clustering Genomes using Adam and MLLib",
    "user_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T00:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Set the local dependencies repository"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : ":local-repo /tmp/spark-notebook/repo",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### ADAM dependencies for the notebook"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : ":dp org.bdgenomics.adam % adam-core % 0.15.0\n- org.apache.hadoop % hadoop-client %   _\n- org.apache.spark  % spark-core    %   _\n- org.scala-lang    %     _         %   _\n- org.scoverage     %     _         %   _\n+ org.apache.spark  %  spark-mllib_2.10  % 1.2.1",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Get the master hostname, define a function to build hdfs urls and reset the spark context with appropriate configuration"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Get the master hostname and define a function to build hdfs urls"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "import sys.process._\nval master = (\"ec2-metadata --public-hostname\"!!).drop(\"public-hostname: \".size).mkString.trim\ndef hu(s:String) = s\"hdfs://$master:9010/temp/$s\"",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Set the AWS keys to get genotypes from S3"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val fs_s3_awsAccessKeyId      = sys.env.get(\"AWS_ACCESS_KEY_ID\").getOrElse(\"<hard-code-one>\")\nval fs_s3_awsSecretAccessKey  = sys.env.get(\"AWS_SECRET_ACCESS_KEY\").getOrElse(\"<hard-code-one>\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Reset the spark context with complete configuration"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "reset(lastChanges = _.set(\"spark.serializer\", \"org.apache.spark.serializer.KryoSerializer\")\n                     .set(\"spark.kryo.registrator\", \"org.bdgenomics.adam.serialization.ADAMKryoRegistrator\")\n                     .set(\"spark.kryoserializer.buffer.mb\", \"4\")\n                     .set(\"spark.kryo.referenceTracking\", \"true\")\n                     .setMaster(s\"spark://$master:7077\")\n                     .setAppName(\"ADAM 1000genomes\")\n                     .set(\"spark.executor.memory\", \"13g\")\n                     .set(\"fs.s3.awsAccessKeyId\", fs_s3_awsAccessKeyId)\n                     .set(\"fs.s3.awsSecretAccessKey\", fs_s3_awsSecretAccessKey)\n)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "import org.apache.hadoop.fs.{FileSystem, Path}\n\nimport org.bdgenomics.adam.converters.{ VCFLine, VCFLineConverter, VCFLineParser }\nimport org.bdgenomics.formats.avro.{Genotype, FlatGenotype}\nimport org.bdgenomics.adam.models.VariantContext\nimport org.bdgenomics.adam.rdd.ADAMContext._\nimport org.bdgenomics.adam.rdd.variation.VariationContext._\nimport org.bdgenomics.adam.rdd.ADAMContext\n  \nimport org.apache.spark.rdd.RDD",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Genotypes\nCreate a Genotype RDD from an ADAM formatted 1000genomes chromosome (22 here) stored on s3"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val adamFileOnS3 = \"s3n://med-at-scale/1000genomes/ALL.chr22.integrated_phase1_v3.20101123.snps_indels_svs.genotypes.vcf.adam\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val gts:RDD[Genotype] = sparkContext.adamLoad(adamFileOnS3)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Make a copy on hdfs if required to speed up a bit..."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "gts.adamParquetSave(hu(\"chr22.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val gts:RDD[Genotype] = sparkContext.adamLoad(hu(\"chr22.adam\"))\ngts.cache()",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Filter variants with position in 16,000,000-17,000,000"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val start = 16000000\nval end   = 17000000\nval sampledGts = gts.filter(g => (g.getVariant.getStart >= start && g.getVariant.getEnd <= end) )",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "Save a local copy of the sample to speedup processing"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "sampledGts.adamParquetSave(hu(\"chr22-sample.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val sampledGts:RDD[Genotype] = sparkContext.adamLoad(hu(\"chr22-sample.adam\"))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Population: List of populations selected for analysis"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "// populations to select\nval pops = Set(\"GBR\", \"ASW\", \"CHB\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "We download on the locafilesystem the list of samples IDs and correspondig populations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val panelFile = \"/vol0/data/ALL.panel\"\ns\"wget ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/release/20130502/integrated_call_samples_v3.20130502.ALL.panel -O ${panelFile}\"!!",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Panel"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "import scala.io.Source\ndef extract(filter: (String, String) => Boolean= (s, t) => true) = Source.fromFile(panelFile).getLines().map( line => {\n  val toks = line.split(\"\\t\").toList\n  toks(0) -> toks(1)\n}).toMap.filter( tup => filter(tup._1, tup._2) )\n  \n// panel extract from file, filtering by the 2 populations\ndef panel: Map[String,String] = \n  extract((sampleID: String, pop: String) => pops.contains(pop)) \n  \n// broadcast the panel \nval bPanel = sparkContext.broadcast(panel)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Filter genotypes further on selected populations"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val finalGts = sampledGts.filter(g =>  bPanel.value.contains(g.getSampleId))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Count samples"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val sampleCount = finalGts.map(_.getSampleId.toString.hashCode).distinct.count\ns\"#Samples: $sampleCount\"",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true
    },
    "cell_type" : "markdown",
    "source" : "### Ensuring completness"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "//utils\nimport scala.collection.JavaConverters._\nimport org.bdgenomics.formats.avro._\ndef variantId(g:Genotype):String = {\n  val name = g.getVariant.getContig.getContigName\n    val start = g.getVariant.getStart\n    val end = g.getVariant.getEnd\n    s\"$name:$start:$end\"\n}\ndef asDouble(g:Genotype):Double = g.getAlleles.asScala.count(_ != GenotypeAllele.Ref)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.SparkContext._\n// A VARIANT SHOULD HAVE sampleCount GENOTYPES\nval variantsById = finalGts.keyBy(g => variantId(g).hashCode).groupByKey.cache\nval missingVariantsRDD = variantsById.filter { case (k, it) => it.size != sampleCount }.keys\n\n// IF SAVING LIST OF MISSING VARIANTS IS REQUIRED...\n//missingVariantsRDD.saveAsObjectFile(hu(\"/model/missing-variants\"))\n\n// could be broadcased as well...\nval missingVariants = missingVariantsRDD.collect().toSet",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val completeGts = finalGts.filter { g => ! (missingVariants contains variantId(g).hashCode) }",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Make dataframe"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val sampleToData: RDD[(String, (Double, Int))] =\n    completeGts.map { g => (g.getSampleId.toString, (asDouble(g), variantId(g).hashCode)) }\n\nval groupedSampleToData = sampleToData.groupByKey",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.linalg.{Vector=>MLVector, Vectors}\ndef makeSortedVector(gts: Iterable[(Double, Int)]): MLVector = Vectors.dense( gts.toArray.sortBy(_._2).map(_._1) )\n\nval dataPerSampleId:RDD[(String, MLVector)] =\n    groupedSampleToData.mapValues { it =>\n        makeSortedVector(it)\n    }\n\nval dataFrame:RDD[MLVector] = dataPerSampleId.values",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Train a 3 clusters Kmeans (10 iterations)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.mllib.clustering.{KMeans,KMeansModel}\nval model: KMeansModel = KMeans.train(dataFrame, 3, 10)",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Now Predict and compute confusion matrix"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val predictions: RDD[(String, (Int, String))] = dataPerSampleId.map(elt => {\n    (elt._1, ( model.predict(elt._2), bPanel.value.getOrElse(elt._1, \"\"))) \n})",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "val confMat = predictions.collect.toMap.values\n    .groupBy(_._2).mapValues(_.map(_._1))\n    .mapValues(xs => (1 to 3).map( i => xs.count(_ == i-1)).toList)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "<table>\n<tr><td></td><td>#0</td><td>#1</td><td>#2</td></tr>\n{ for (popu <- confMat) yield\n  <tr><td>{popu._1}</td> { for (cnt <- popu._2) yield \n    <td>{cnt}</td>\n  }\n  </tr>\n}\n</table>",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}